跳到主要内容

Go 使用 RocketMQ

go get github.com/apache/rocketmq-client-go/v2 

编写简单的测试代码

package main

import (
"context"
"fmt"
"os"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/admin"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
// 1. 创建主题,这一步可以省略,在send的时候如果没有topic,也会进行创建。
CreateTopic("testTopic01")
// 2.生产者向主题中发送消息
SendSyncMessage("hello world2022send test ,rocketmq go client! too,是的")
// 3.消费者订阅主题并消费
SubcribeMessage()
}

func CreateTopic(topicName string) {
endPoint := []string{"192.168.120.78:9876"}
// 创建主题
testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(endPoint)))
if err != nil {
fmt.Printf("connection error: %s\n", err.Error())
}
err = testAdmin.CreateTopic(context.Background(), admin.WithTopicCreate(topicName))
if err != nil {
fmt.Printf("createTopic error: %s\n", err.Error())
}
}

func SendSyncMessage(message string) {
// 发送消息
endPoint := []string{"192.168.120.78:9876"}
// 创建一个producer实例
p, _ := rocketmq.NewProducer(
producer.WithNameServer(endPoint),
producer.WithRetry(2),
producer.WithGroupName("ProducerGroupName"),
)
// 启动
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}

// 发送消息
result, err := p.SendSync(context.Background(), &primitive.Message{
Topic: "testTopic01",
Body: []byte(message),
})

if err != nil {
fmt.Printf("send message error: %s\n", err.Error())
} else {
fmt.Printf("send message seccess: result=%s\n", result.String())
}
}

func SubcribeMessage() {
// 订阅主题、消费
endPoint := []string{"192.168.120.78:9876"}
// 创建一个consumer实例
c, err := rocketmq.NewPushConsumer(consumer.WithNameServer(endPoint),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithGroupName("ConsumerGroupName"),
)

// 订阅topic
err = c.Subscribe("testTopic01", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback : %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})

if err != nil {
fmt.Printf("subscribe message error: %s\n", err.Error())
}

// 启动consumer
err = c.Start()

if err != nil {
fmt.Printf("consumer start error: %s\n", err.Error())
os.Exit(-1)
}

err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s\n", err.Error())
}
}

我们运行代码可以选择用命令:go run 指定文件名,也可以找到main函数执行run,还可以类似配置我们启动类进行配置入口文件。运行日志如下:

GOROOT=/usr/local/go #gosetup
GOPATH=/Users/dxm/go #gosetup
/usr/local/go/bin/go build -o /private/var/folders/bw/xvcy7d7j7nscgrtsbk1lmc500000gn/T/GoLand/___go_build_awesomeProject awesomeProject #gosetup
/private/var/folders/bw/xvcy7d7j7nscgrtsbk1lmc500000gn/T/GoLand/___go_build_awesomeProject
ERRO[0005] create topic error broker="192.168.120.78:10911" topic=testTopic01 underlayError="request timeout"
createTopic error: request timeout
WARN[0005] query topic route from server error underlayError="topic not exist"
WARN[0005] queryTopicRouteInfoFromServer return nil topic=testTopic01
INFO[0005] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":7,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="<nil>" topic=testTopic01
send message seccess: result=SendResult [sendStatus=0, msgIds=C0A8784EB39C0000000074f884e00001, offsetMsgId=C0A8784E00002A9D000000000000021E, queueOffset=0, messageQueue=MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=1]]
INFO[0005] the consumer start beginning consumerGroup=ConsumerGroupName messageModel=Clustering unitMode=false
WARN[0005] query topic route from server error underlayError="topic not exist"
WARN[0005] queryTopicRouteInfoFromServer return nil topic=testTopic01
INFO[0005] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":1,\"writeQueueNums\":1,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="<nil>" topic="%RETRY%ConsumerGroupName"
INFO[0005] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":7,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" topic=testTopic01
INFO[0005] receive broker's notification to consumer group consumerGroup=ConsumerGroupName
INFO[0005] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":1,\"writeQueueNums\":1,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="<nil>" topic="%RETRY%ConsumerGroupName"
WARN[0005] delete mq from offset table MessageQueue="MessageQueue [topic=%RETRY%ConsumerGroupName, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName
WARN[0005] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=%RETRY%ConsumerGroupName, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] the MessageQueue changed, version also updated changeTo=1655975244827267000 changedFrom=0
INFO[0005] The PullThresholdForTopic is changed changeTo=102400 changedFrom=102400
INFO[0005] The PullThresholdSizeForTopic is changed changeTo=51200 changedFrom=51200
INFO[0005] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="<nil>" topic=testTopic01
WARN[0005] delete mq from offset table MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=1]" consumerGroup=ConsumerGroupName
WARN[0005] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=1]" consumerGroup=ConsumerGroupName offset=0
WARN[0005] delete mq from offset table MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=2]" consumerGroup=ConsumerGroupName
WARN[0005] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=2]" consumerGroup=ConsumerGroupName offset=0
WARN[0005] delete mq from offset table MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=3]" consumerGroup=ConsumerGroupName
subscribe callback : [Message=[topic=testTopic01, body=hello world2022send test ,rocketmq go client! too, Flag=0, properties=map[CONSUME_START_TIME:1655975244853 MAX_FFSET:1 MIN_OFFSET:0 UNIQ_KEY:C0A8784EB39C0000000074f884e00001], TransactionId=], MsgId=C0A8784EB39C0000000074f884e00001, OffsetMsgId=C0A8784E00002A9D000000000000021E,QueueId=1, StoreSize=196, QueueOffset=0, SysFlag=0, BornTimestamp=1655975244765, BornHost=172.23.0.1:60480, StoreTimestamp=1655975244804, StoreHost=192.168.120.78:10909, CommitLogOffset=542, BodyCRC=1591959001, ReconsumeTimes=0, PreparedTransactionOffset=0]
WARN[0005] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=3]" consumerGroup=ConsumerGroupName offset=0
WARN[0005] delete mq from offset table MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName
WARN[0005] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] the MessageQueue changed, version also updated changeTo=1655975244861143000 changedFrom=0
INFO[0005] The PullThresholdForTopic is changed changeTo=20480 changedFrom=102400
INFO[0005] The PullThresholdSizeForTopic is changed changeTo=10240 changedFrom=51200
INFO[0005] push consumer close pullConsumer listener. consumerGroup=ConsumerGroupName
INFO[0005] update offset to broker success MessageQueue="MessageQueue [topic=%RETRY%ConsumerGroupName, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] update offset to broker success MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=1]" consumerGroup=ConsumerGroupName offset=1
INFO[0005] update offset to broker success MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=2]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] update offset to broker success MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=3]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] update offset to broker success MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] will remove client from clientMap clientID=192.168.120.78@45980

Process finished with the exit code 0

从日志大家可以发现有很多 error,但是实际上程序执行是成功的,因为在实际开发中不需要显式的去执行 Createtopic 函数的,因为在发送消息的时候,如果 topic 不存在会自动创建(注:有的时候使用的云服务商的 MQ 需要手动创建 topic,也不需要执行 Createtopic 函数,所以这里仅仅是 Demo 示例)。

我们再来看看 message:

References